{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# KubeFlow Pipeline DSL Static Type Checking\n",
    "\n",
    "In this notebook, we will demo: \n",
    "\n",
    "* Defining a KubeFlow pipeline with Python DSL\n",
    "* Compile the pipeline with type checking\n",
    "\n",
    "Static type checking helps users to identify component I/O inconsistencies without running the pipeline. It also shortens the development cycles by catching the errors early. This feature is especially useful in two cases: 1) when the pipeline is huge and manually checking the types is infeasible; 2) when some components are shared ones and the type information is not immediately avaiable to the pipeline authors.\n",
    "\n",
    "Since this sample focuses on the DSL type checking, we will use components that are not runnable in the system but with various type checking scenarios. \n",
    "\n",
    "## Component definition\n",
    "Components can be defined in either YAML or functions decorated by dsl.component.\n",
    "\n",
    "## Type definition\n",
    "Types can be defined as string or a dictionary with the openapi_schema_validator property formatted as:\n",
    "```yaml\n",
    "{\n",
    "    type_name: {\n",
    "        openapi_schema_validator: {\n",
    "        }\n",
    "    }\n",
    "}\n",
    "```\n",
    "For example, the following yaml declares a GCSPath type with the openapi_schema_validator for output field_m.\n",
    "The type could also be a plain string, such as the GcsUri. The type name could be either one of the core types or customized ones.\n",
    "```yaml\n",
    "name: component a\n",
    "description: component a desc\n",
    "inputs:\n",
    "  - {name: field_l, type: Integer}\n",
    "outputs:\n",
    "  - {name: field_m, type: {GCSPath: {openapi_schema_validator: {type: string, pattern: \"^gs://.*$\" } }}}\n",
    "  - {name: field_n, type: customized_type}\n",
    "  - {name: field_o, type: GcsUri} \n",
    "implementation:\n",
    "  container:\n",
    "    image: gcr.io/ml-pipeline/component-a\n",
    "    command: [python3, /pipelines/component/src/train.py]\n",
    "    args: [\n",
    "      --field-l, {inputValue: field_l},\n",
    "    ]\n",
    "    fileOutputs: \n",
    "      field_m: /schema.txt\n",
    "      field_n: /feature.txt\n",
    "      field_o: /output.txt\n",
    "```\n",
    "\n",
    "If you define the component using the function decorator, there are a list of [core types](https://github.com/kubeflow/pipelines/blob/master/sdk/python/kfp/dsl/types.py).\n",
    "For example, the following component declares a core type Integer for input field_l while\n",
    "declares customized_type for its output field_n.\n",
    "\n",
    "```python\n",
    "@component\n",
    "def task_factory_a(field_l: Integer()) -> {'field_m': {'GCSPath': {'openapi_schema_validator': '{\"type\": \"string\", \"pattern\": \"^gs://.*$\"}'}}, \n",
    "                                           'field_n': 'customized_type',\n",
    "                                           'field_o': 'Integer'\n",
    "                                          }:\n",
    "    return ContainerOp(\n",
    "        name = 'operator a',\n",
    "        image = 'gcr.io/ml-pipeline/component-a',\n",
    "        arguments = [\n",
    "            '--field-l', field_l,\n",
    "        ],\n",
    "        file_outputs = {\n",
    "            'field_m': '/schema.txt',\n",
    "            'field_n': '/feature.txt',\n",
    "            'field_o': '/output.txt'\n",
    "        }\n",
    "    )\n",
    "```\n",
    "\n",
    "## Type check switch\n",
    "Type checking is enabled by default. It can be disabled as --disable-type-check argument if dsl-compile is run in the command line, or `dsl.compiler.Compiler().compile(type_check=False)`.\n",
    "\n",
    "If one wants to ignore the type for one parameter, call ignore_type() function in [PipelineParam](https://github.com/kubeflow/pipelines/blob/master/sdk/python/kfp/dsl/_pipeline_param.py).\n",
    "\n",
    "## How does type checking work?\n",
    "DSL compiler checks the type consistencies among components by checking the type_name as well as the openapi_schema_validator. Some special cases are listed here:\n",
    "1. Type checking succeed: If the upstream/downstream components lack the type information.\n",
    "2. Type checking succeed: If the type check is disabled.\n",
    "3. Type checking succeed: If the parameter type is ignored."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Setup"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "scrolled": false
   },
   "source": [
    "## Install Pipeline SDK"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Collecting https://storage.googleapis.com/ml-pipeline/release/0.1.12/kfp-experiment.tar.gz\n",
      "  Using cached https://storage.googleapis.com/ml-pipeline/release/0.1.12/kfp-experiment.tar.gz\n",
      "Requirement already satisfied, skipping upgrade: urllib3>=1.15 in /opt/conda/lib/python3.6/site-packages (from kfp==0.1) (1.22)\n",
      "Requirement already satisfied, skipping upgrade: six>=1.10 in /opt/conda/lib/python3.6/site-packages (from kfp==0.1) (1.11.0)\n",
      "Requirement already satisfied, skipping upgrade: certifi in /opt/conda/lib/python3.6/site-packages (from kfp==0.1) (2018.11.29)\n",
      "Requirement already satisfied, skipping upgrade: python-dateutil in /opt/conda/lib/python3.6/site-packages (from kfp==0.1) (2.7.5)\n",
      "Requirement already satisfied, skipping upgrade: PyYAML in /opt/conda/lib/python3.6/site-packages (from kfp==0.1) (3.13)\n",
      "Requirement already satisfied, skipping upgrade: google-cloud-storage==1.13.0 in /opt/conda/lib/python3.6/site-packages (from kfp==0.1) (1.13.0)\n",
      "Requirement already satisfied, skipping upgrade: kubernetes==8.0.0 in /opt/conda/lib/python3.6/site-packages (from kfp==0.1) (8.0.0)\n",
      "Requirement already satisfied, skipping upgrade: PyJWT==1.6.4 in /opt/conda/lib/python3.6/site-packages (from kfp==0.1) (1.6.4)\n",
      "Requirement already satisfied, skipping upgrade: cryptography==2.4.2 in /opt/conda/lib/python3.6/site-packages (from kfp==0.1) (2.4.2)\n",
      "Requirement already satisfied, skipping upgrade: google-auth==1.6.1 in /opt/conda/lib/python3.6/site-packages (from kfp==0.1) (1.6.1)\n",
      "Requirement already satisfied, skipping upgrade: requests_toolbelt==0.8.0 in /opt/conda/lib/python3.6/site-packages (from kfp==0.1) (0.8.0)\n",
      "Requirement already satisfied, skipping upgrade: google-resumable-media>=0.3.1 in /opt/conda/lib/python3.6/site-packages (from google-cloud-storage==1.13.0->kfp==0.1) (0.3.1)\n",
      "Requirement already satisfied, skipping upgrade: google-cloud-core<0.29dev,>=0.28.0 in /opt/conda/lib/python3.6/site-packages (from google-cloud-storage==1.13.0->kfp==0.1) (0.28.1)\n",
      "Requirement already satisfied, skipping upgrade: google-api-core<2.0.0dev,>=0.1.1 in /opt/conda/lib/python3.6/site-packages (from google-cloud-storage==1.13.0->kfp==0.1) (1.6.0)\n",
      "Requirement already satisfied, skipping upgrade: adal>=1.0.2 in /opt/conda/lib/python3.6/site-packages (from kubernetes==8.0.0->kfp==0.1) (1.2.1)\n",
      "Requirement already satisfied, skipping upgrade: websocket-client!=0.40.0,!=0.41.*,!=0.42.*,>=0.32.0 in /opt/conda/lib/python3.6/site-packages (from kubernetes==8.0.0->kfp==0.1) (0.54.0)\n",
      "Requirement already satisfied, skipping upgrade: requests-oauthlib in /opt/conda/lib/python3.6/site-packages (from kubernetes==8.0.0->kfp==0.1) (1.0.0)\n",
      "Requirement already satisfied, skipping upgrade: setuptools>=21.0.0 in /opt/conda/lib/python3.6/site-packages (from kubernetes==8.0.0->kfp==0.1) (38.4.0)\n",
      "Requirement already satisfied, skipping upgrade: requests in /opt/conda/lib/python3.6/site-packages (from kubernetes==8.0.0->kfp==0.1) (2.18.4)\n",
      "Requirement already satisfied, skipping upgrade: asn1crypto>=0.21.0 in /opt/conda/lib/python3.6/site-packages (from cryptography==2.4.2->kfp==0.1) (0.24.0)\n",
      "Requirement already satisfied, skipping upgrade: idna>=2.1 in /opt/conda/lib/python3.6/site-packages (from cryptography==2.4.2->kfp==0.1) (2.6)\n",
      "Requirement already satisfied, skipping upgrade: cffi!=1.11.3,>=1.7 in /opt/conda/lib/python3.6/site-packages (from cryptography==2.4.2->kfp==0.1) (1.11.4)\n",
      "Requirement already satisfied, skipping upgrade: pyasn1-modules>=0.2.1 in /opt/conda/lib/python3.6/site-packages (from google-auth==1.6.1->kfp==0.1) (0.2.2)\n",
      "Requirement already satisfied, skipping upgrade: rsa>=3.1.4 in /opt/conda/lib/python3.6/site-packages (from google-auth==1.6.1->kfp==0.1) (4.0)\n",
      "Requirement already satisfied, skipping upgrade: cachetools>=2.0.0 in /opt/conda/lib/python3.6/site-packages (from google-auth==1.6.1->kfp==0.1) (3.0.0)\n",
      "Requirement already satisfied, skipping upgrade: pytz in /opt/conda/lib/python3.6/site-packages (from google-api-core<2.0.0dev,>=0.1.1->google-cloud-storage==1.13.0->kfp==0.1) (2018.7)\n",
      "Requirement already satisfied, skipping upgrade: googleapis-common-protos!=1.5.4,<2.0dev,>=1.5.3 in /opt/conda/lib/python3.6/site-packages (from google-api-core<2.0.0dev,>=0.1.1->google-cloud-storage==1.13.0->kfp==0.1) (1.5.5)\n",
      "Requirement already satisfied, skipping upgrade: protobuf>=3.4.0 in /opt/conda/lib/python3.6/site-packages (from google-api-core<2.0.0dev,>=0.1.1->google-cloud-storage==1.13.0->kfp==0.1) (3.6.1)\n",
      "Requirement already satisfied, skipping upgrade: oauthlib>=0.6.2 in /opt/conda/lib/python3.6/site-packages (from requests-oauthlib->kubernetes==8.0.0->kfp==0.1) (2.1.0)\n",
      "Requirement already satisfied, skipping upgrade: chardet<3.1.0,>=3.0.2 in /opt/conda/lib/python3.6/site-packages (from requests->kubernetes==8.0.0->kfp==0.1) (3.0.4)\n",
      "Requirement already satisfied, skipping upgrade: pycparser in /opt/conda/lib/python3.6/site-packages (from cffi!=1.11.3,>=1.7->cryptography==2.4.2->kfp==0.1) (2.18)\n",
      "Requirement already satisfied, skipping upgrade: pyasn1<0.5.0,>=0.4.1 in /opt/conda/lib/python3.6/site-packages (from pyasn1-modules>=0.2.1->google-auth==1.6.1->kfp==0.1) (0.4.4)\n",
      "Building wheels for collected packages: kfp\n",
      "  Running setup.py bdist_wheel for kfp ... \u001b[?25ldone\n",
      "\u001b[?25h  Stored in directory: /home/jovyan/.cache/pip/wheels/06/14/fc/dd58bcc821d8067efa74a9e217db214d8a075c6b5d31ff24cf\n",
      "Successfully built kfp\n",
      "Installing collected packages: kfp\n",
      "  Found existing installation: kfp 0.1\n",
      "    Uninstalling kfp-0.1:\n",
      "      Successfully uninstalled kfp-0.1\n",
      "Successfully installed kfp-0.1\n",
      "\u001b[33mYou are using pip version 18.1, however version 19.0.3 is available.\n",
      "You should consider upgrading via the 'pip install --upgrade pip' command.\u001b[0m\n"
     ]
    }
   ],
   "source": [
    "!python3 -m pip install 'kfp>=0.1.31' --quiet\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Type Check with YAML components: successful scenario"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Author components in YAML"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "# In yaml, one can optionally add the type information to both inputs and outputs.\n",
    "# There are two ways to define the types: string or a dictionary with the openapi_schema_validator property.\n",
    "# The openapi_schema_validator is a json schema object that describes schema of the parameter value.\n",
    "component_a = '''\\\n",
    "name: component a\n",
    "description: component a desc\n",
    "inputs:\n",
    "  - {name: field_l, type: Integer}\n",
    "outputs:\n",
    "  - {name: field_m, type: {GCSPath: {openapi_schema_validator: {type: string, pattern: \"^gs://.*$\" } }}}\n",
    "  - {name: field_n, type: customized_type}\n",
    "  - {name: field_o, type: GcsUri} \n",
    "implementation:\n",
    "  container:\n",
    "    image: gcr.io/ml-pipeline/component-a\n",
    "    command: [python3, /pipelines/component/src/train.py]\n",
    "    args: [\n",
    "      --field-l, {inputValue: field_l},\n",
    "    ]\n",
    "    fileOutputs: \n",
    "      field_m: /schema.txt\n",
    "      field_n: /feature.txt\n",
    "      field_o: /output.txt\n",
    "'''\n",
    "component_b = '''\\\n",
    "name: component b\n",
    "description: component b desc\n",
    "inputs:\n",
    "  - {name: field_x, type: customized_type}\n",
    "  - {name: field_y, type: GcsUri}\n",
    "  - {name: field_z, type: {GCSPath: {openapi_schema_validator: {type: string, pattern: \"^gs://.*$\" } }}}\n",
    "outputs:\n",
    "  - {name: output_model_uri, type: GcsUri}\n",
    "implementation:\n",
    "  container:\n",
    "    image: gcr.io/ml-pipeline/component-a\n",
    "    command: [python3]\n",
    "    args: [\n",
    "      --field-x, {inputValue: field_x},\n",
    "      --field-y, {inputValue: field_y},\n",
    "      --field-z, {inputValue: field_z},\n",
    "    ]\n",
    "    fileOutputs: \n",
    "      output_model_uri: /schema.txt\n",
    "'''"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Author a pipeline with the above components"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "import kfp.components as comp\n",
    "import kfp.dsl as dsl\n",
    "import kfp.compiler as compiler\n",
    "# The components are loaded as task factories that generate container_ops.\n",
    "task_factory_a = comp.load_component_from_text(text=component_a)\n",
    "task_factory_b = comp.load_component_from_text(text=component_b)\n",
    "\n",
    "#Use the component as part of the pipeline\n",
    "@dsl.pipeline(name='type_check_a',\n",
    "    description='')\n",
    "def pipeline_a():\n",
    "    a = task_factory_a(field_l=12)\n",
    "    b = task_factory_b(field_x=a.outputs['field_n'], field_y=a.outputs['field_o'], field_z=a.outputs['field_m'])\n",
    "\n",
    "compiler.Compiler().compile(pipeline_a, 'pipeline_a.zip', type_check=True)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Type Check with YAML components: failed scenario"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Author components in YAML"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "# In this case, the component_a contains an output field_o as GcrUri \n",
    "# but the component_b requires an input field_y as GcsUri\n",
    "component_a = '''\\\n",
    "name: component a\n",
    "description: component a desc\n",
    "inputs:\n",
    "  - {name: field_l, type: Integer}\n",
    "outputs:\n",
    "  - {name: field_m, type: {GCSPath: {openapi_schema_validator: {type: string, pattern: \"^gs://.*$\" } }}}\n",
    "  - {name: field_n, type: customized_type}\n",
    "  - {name: field_o, type: GcrUri} \n",
    "implementation:\n",
    "  container:\n",
    "    image: gcr.io/ml-pipeline/component-a\n",
    "    command: [python3, /pipelines/component/src/train.py]\n",
    "    args: [\n",
    "      --field-l, {inputValue: field_l},\n",
    "    ]\n",
    "    fileOutputs: \n",
    "      field_m: /schema.txt\n",
    "      field_n: /feature.txt\n",
    "      field_o: /output.txt\n",
    "'''\n",
    "component_b = '''\\\n",
    "name: component b\n",
    "description: component b desc\n",
    "inputs:\n",
    "  - {name: field_x, type: customized_type}\n",
    "  - {name: field_y, type: GcsUri}\n",
    "  - {name: field_z, type: {GCSPath: {openapi_schema_validator: {type: string, pattern: \"^gs://.*$\" } }}}\n",
    "outputs:\n",
    "  - {name: output_model_uri, type: GcsUri}\n",
    "implementation:\n",
    "  container:\n",
    "    image: gcr.io/ml-pipeline/component-a\n",
    "    command: [python3]\n",
    "    args: [\n",
    "      --field-x, {inputValue: field_x},\n",
    "      --field-y, {inputValue: field_y},\n",
    "      --field-z, {inputValue: field_z},\n",
    "    ]\n",
    "    fileOutputs: \n",
    "      output_model_uri: /schema.txt\n",
    "'''"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Author a pipeline with the above components"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "type name GcrUri is different from expected: GcsUri\n",
      "Component \"component b\" is expecting field_y to be type(GcsUri), but the passed argument is type(GcrUri)\n"
     ]
    }
   ],
   "source": [
    "import kfp.components as comp\n",
    "import kfp.dsl as dsl\n",
    "import kfp.compiler as compiler\n",
    "from kfp.dsl.types import InconsistentTypeException\n",
    "task_factory_a = comp.load_component_from_text(text=component_a)\n",
    "task_factory_b = comp.load_component_from_text(text=component_b)\n",
    "\n",
    "#Use the component as part of the pipeline\n",
    "@dsl.pipeline(name='type_check_b',\n",
    "    description='')\n",
    "def pipeline_b():\n",
    "    a = task_factory_a(field_l=12)\n",
    "    b = task_factory_b(field_x=a.outputs['field_n'], field_y=a.outputs['field_o'], field_z=a.outputs['field_m'])\n",
    "\n",
    "try:\n",
    "    compiler.Compiler().compile(pipeline_b, 'pipeline_b.zip', type_check=True)\n",
    "except InconsistentTypeException as e:\n",
    "    print(e)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Author a pipeline with the above components but type checking disabled."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Disable the type_check\n",
    "compiler.Compiler().compile(pipeline_b, 'pipeline_b.zip', type_check=False)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Type Check with decorated components: successful scenario"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Author components with decorator"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [],
   "source": [
    "from kfp.dsl import component\n",
    "from kfp.dsl.types import Integer, GCSPath\n",
    "from kfp.dsl import ContainerOp\n",
    "# when components are defined based on the component decorator,\n",
    "# the type information is annotated to the input or function returns.\n",
    "# There are two ways to define the type: string or a dictionary with the openapi_schema_validator property\n",
    "@component\n",
    "def task_factory_a(field_l: Integer()) -> {'field_m': {'GCSPath': {'openapi_schema_validator': '{\"type\": \"string\", \"pattern\": \"^gs://.*$\"}'}}, \n",
    "                                           'field_n': 'customized_type',\n",
    "                                           'field_o': 'Integer'\n",
    "                                          }:\n",
    "    return ContainerOp(\n",
    "        name = 'operator a',\n",
    "        image = 'gcr.io/ml-pipeline/component-a',\n",
    "        arguments = [\n",
    "            '--field-l', field_l,\n",
    "        ],\n",
    "        file_outputs = {\n",
    "            'field_m': '/schema.txt',\n",
    "            'field_n': '/feature.txt',\n",
    "            'field_o': '/output.txt'\n",
    "        }\n",
    "    )\n",
    "\n",
    "# Users can also use the core types that are pre-defined in the SDK.\n",
    "# For a full list of core types, check out: https://github.com/kubeflow/pipelines/blob/master/sdk/python/kfp/dsl/types.py\n",
    "@component\n",
    "def task_factory_b(field_x: 'customized_type',\n",
    "        field_y: Integer(),\n",
    "        field_z: {'GCSPath': {'openapi_schema_validator': '{\"type\": \"string\", \"pattern\": \"^gs://.*$\"}'}}) -> {'output_model_uri': 'GcsUri'}:\n",
    "    return ContainerOp(\n",
    "        name = 'operator b',\n",
    "        image = 'gcr.io/ml-pipeline/component-a',\n",
    "        command = [\n",
    "            'python3',\n",
    "            field_x,\n",
    "        ],\n",
    "        arguments = [\n",
    "            '--field-y', field_y,\n",
    "            '--field-z', field_z,\n",
    "        ],\n",
    "        file_outputs = {\n",
    "            'output_model_uri': '/schema.txt',\n",
    "        }\n",
    "    )"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Author a pipeline with the above components"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [],
   "source": [
    "#Use the component as part of the pipeline\n",
    "@dsl.pipeline(name='type_check_c',\n",
    "    description='')\n",
    "def pipeline_c():\n",
    "    a = task_factory_a(field_l=12)\n",
    "    b = task_factory_b(field_x=a.outputs['field_n'], field_y=a.outputs['field_o'], field_z=a.outputs['field_m'])\n",
    "\n",
    "compiler.Compiler().compile(pipeline_c, 'pipeline_c.zip', type_check=True)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Type Check with decorated components: failure scenario"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Author components with decorator"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [],
   "source": [
    "from kfp.dsl import component\n",
    "from kfp.dsl.types import Integer, GCSPath\n",
    "from kfp.dsl import ContainerOp\n",
    "# task_factory_a outputs an input field_m with the openapi_schema_validator different\n",
    "# from the task_factory_b's input field_z.\n",
    "# One is gs:// and the other is gcs://\n",
    "@component\n",
    "def task_factory_a(field_l: Integer()) -> {'field_m': {'GCSPath': {'openapi_schema_validator': '{\"type\": \"string\", \"pattern\": \"^gs://.*$\"}'}}, \n",
    "                                           'field_n': 'customized_type',\n",
    "                                           'field_o': 'Integer'\n",
    "                                          }:\n",
    "    return ContainerOp(\n",
    "        name = 'operator a',\n",
    "        image = 'gcr.io/ml-pipeline/component-a',\n",
    "        arguments = [\n",
    "            '--field-l', field_l,\n",
    "        ],\n",
    "        file_outputs = {\n",
    "            'field_m': '/schema.txt',\n",
    "            'field_n': '/feature.txt',\n",
    "            'field_o': '/output.txt'\n",
    "        }\n",
    "    )\n",
    "\n",
    "@component\n",
    "def task_factory_b(field_x: 'customized_type',\n",
    "        field_y: Integer(),\n",
    "        field_z: {'GCSPath': {'openapi_schema_validator': '{\"type\": \"string\", \"pattern\": \"^gcs://.*$\"}'}}) -> {'output_model_uri': 'GcsUri'}:\n",
    "    return ContainerOp(\n",
    "        name = 'operator b',\n",
    "        image = 'gcr.io/ml-pipeline/component-a',\n",
    "        command = [\n",
    "            'python3',\n",
    "            field_x,\n",
    "        ],\n",
    "        arguments = [\n",
    "            '--field-y', field_y,\n",
    "            '--field-z', field_z,\n",
    "        ],\n",
    "        file_outputs = {\n",
    "            'output_model_uri': '/schema.txt',\n",
    "        }\n",
    "    )"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Author a pipeline with the above components"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "GCSPath has a property openapi_schema_validator with value: {\"type\": \"string\", \"pattern\": \"^gs://.*$\"} and {\"type\": \"string\", \"pattern\": \"^gcs://.*$\"}\n",
      "Component \"task_factory_b\" is expecting field_z to be type({'GCSPath': {'openapi_schema_validator': '{\"type\": \"string\", \"pattern\": \"^gcs://.*$\"}'}}), but the passed argument is type({'GCSPath': {'openapi_schema_validator': '{\"type\": \"string\", \"pattern\": \"^gs://.*$\"}'}})\n"
     ]
    }
   ],
   "source": [
    "#Use the component as part of the pipeline\n",
    "@dsl.pipeline(name='type_check_d',\n",
    "    description='')\n",
    "def pipeline_d():\n",
    "    a = task_factory_a(field_l=12)\n",
    "    b = task_factory_b(field_x=a.outputs['field_n'], field_y=a.outputs['field_o'], field_z=a.outputs['field_m'])\n",
    "\n",
    "try:\n",
    "    compiler.Compiler().compile(pipeline_d, 'pipeline_d.zip', type_check=True)\n",
    "except InconsistentTypeException as e:\n",
    "    print(e)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Author a pipeline with the above components but ignoring types."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [],
   "source": [
    "#Use the component as part of the pipeline\n",
    "@dsl.pipeline(name='type_check_d',\n",
    "    description='')\n",
    "def pipeline_d():\n",
    "    a = task_factory_a(field_l=12)\n",
    "    # For each of the arguments, authors can also ignore the types by calling ignore_type function.\n",
    "    b = task_factory_b(field_x=a.outputs['field_n'], field_y=a.outputs['field_o'], field_z=a.outputs['field_m'].ignore_type())\n",
    "compiler.Compiler().compile(pipeline_d, 'pipeline_d.zip', type_check=True)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Type Check with missing type information"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Author components(with missing types)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [],
   "source": [
    "from kfp.dsl import component\n",
    "from kfp.dsl.types import Integer, GCSPath\n",
    "from kfp.dsl import ContainerOp\n",
    "# task_factory_a lacks the type information for output filed_n\n",
    "# task_factory_b lacks the type information for input field_y\n",
    "# When no type information is provided, it matches all types.\n",
    "@component\n",
    "def task_factory_a(field_l: Integer()) -> {'field_m': {'GCSPath': {'openapi_schema_validator': '{\"type\": \"string\", \"pattern\": \"^gs://.*$\"}'}}, \n",
    "                                           'field_o': 'Integer'\n",
    "                                          }:\n",
    "    return ContainerOp(\n",
    "        name = 'operator a',\n",
    "        image = 'gcr.io/ml-pipeline/component-a',\n",
    "        arguments = [\n",
    "            '--field-l', field_l,\n",
    "        ],\n",
    "        file_outputs = {\n",
    "            'field_m': '/schema.txt',\n",
    "            'field_n': '/feature.txt',\n",
    "            'field_o': '/output.txt'\n",
    "        }\n",
    "    )\n",
    "\n",
    "@component\n",
    "def task_factory_b(field_x: 'customized_type',\n",
    "        field_y,\n",
    "        field_z: {'GCSPath': {'openapi_schema_validator': '{\"type\": \"string\", \"pattern\": \"^gs://.*$\"}'}}) -> {'output_model_uri': 'GcsUri'}:\n",
    "    return ContainerOp(\n",
    "        name = 'operator b',\n",
    "        image = 'gcr.io/ml-pipeline/component-a',\n",
    "        command = [\n",
    "            'python3',\n",
    "            field_x,\n",
    "        ],\n",
    "        arguments = [\n",
    "            '--field-y', field_y,\n",
    "            '--field-z', field_z,\n",
    "        ],\n",
    "        file_outputs = {\n",
    "            'output_model_uri': '/schema.txt',\n",
    "        }\n",
    "    )"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Author a pipeline with the above components"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {},
   "outputs": [],
   "source": [
    "#Use the component as part of the pipeline\n",
    "@dsl.pipeline(name='type_check_e',\n",
    "    description='')\n",
    "def pipeline_e():\n",
    "    a = task_factory_a(field_l=12)\n",
    "    b = task_factory_b(field_x=a.outputs['field_n'], field_y=a.outputs['field_o'], field_z=a.outputs['field_m'])\n",
    "\n",
    "compiler.Compiler().compile(pipeline_e, 'pipeline_e.zip', type_check=True)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Type Check with both named arguments and positional arguments"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {},
   "outputs": [],
   "source": [
    "#Use the component as part of the pipeline\n",
    "@dsl.pipeline(name='type_check_f',\n",
    "    description='')\n",
    "def pipeline_f():\n",
    "    a = task_factory_a(field_l=12)\n",
    "    b = task_factory_b(a.outputs['field_n'], a.outputs['field_o'], field_z=a.outputs['field_m'])\n",
    "\n",
    "compiler.Compiler().compile(pipeline_f, 'pipeline_f.zip', type_check=True)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Type Check between pipeline parameters and component parameters"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Integer has a property openapi_schema_validator that the latter does not.\n",
      "Component \"task_factory_a\" is expecting field_o to be type(Integer), but the passed argument is type({'Integer': {'openapi_schema_validator': {'type': 'integer'}}})\n"
     ]
    }
   ],
   "source": [
    "@component\n",
    "def task_factory_a(field_m: {'GCSPath': {'openapi_schema_validator': '{\"type\": \"string\", \"pattern\": \"^gs://.*$\"}'}}, field_o: 'Integer'):\n",
    "    return ContainerOp(\n",
    "        name = 'operator a',\n",
    "        image = 'gcr.io/ml-pipeline/component-b',\n",
    "        arguments = [\n",
    "            '--field-l', field_m,\n",
    "            '--field-o', field_o,\n",
    "        ],\n",
    "    )\n",
    "\n",
    "# Pipeline input types are also checked against the component I/O types.\n",
    "@dsl.pipeline(name='type_check_g',\n",
    "    description='')\n",
    "def pipeline_g(a: {'GCSPath': {'openapi_schema_validator': '{\"type\": \"string\", \"pattern\": \"^gs://.*$\"}'}}='gs://kfp-path', b: Integer()=12):\n",
    "    task_factory_a(field_m=a, field_o=b)\n",
    "\n",
    "try:\n",
    "    compiler.Compiler().compile(pipeline_g, 'pipeline_g.zip', type_check=True)\n",
    "except InconsistentTypeException as e:\n",
    "    print(e)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Clean up"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pathlib import Path\n",
    "for p in Path(\".\").glob(\"pipeline_[a-g].zip\"):\n",
    "    p.unlink()"
   ]
  }
 ],
 "metadata": {
  "celltoolbar": "Tags",
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.5"
  },
  "pycharm": {
   "stem_cell": {
    "cell_type": "raw",
    "source": [],
    "metadata": {
     "collapsed": false
    }
   }
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}